-
Notifications
You must be signed in to change notification settings - Fork 41
fix: Fix derived table breaks on with joins #573
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| } | ||
| } | ||
|
|
||
| TEST_F(PlanTest, outerJoinWithInnerJoin) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test pass on current main, is it expected?
| #define AXIOM_ASSERT_PLAN(plan, matcher) \ | ||
| ASSERT_TRUE(matcher->match(plan)) << plan->toString(true, true); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this macros already defined in this file
|
|
||
| // For example on the right of left outer join, a filter must not go to the enclosing dt but must make its own dt. | ||
| if (!contains(allowedInDt, PlanType::kFilterNode)) { | ||
| return wrapInDt(node); | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no test for this.
Even if we allow filter for right part of non-inner join, outerJoinWithInnerJoin test still will pass
c585092 to
9454b48
Compare
6281f34 to
77b66bb
Compare
ToGraph builds derived tables bottom up. On the returning edge of recursion we add joins or dt postprocessing steps like group by or limit. When there are things that do not fit the implied processing order of a dt, i.e. joins, group by, having, orderby, limit/ofset, we wrap the plan so far in another dt. For example, scan, limit, filter, limit would have (scan, limit) in a dt and the filter and second limit in a dt containing the first one. Now, when a dt as above is to the right of a join, we must start the dt to the right from scratch, meaning that the tables in the dt must be empty and not contain tables from the left side. On the other hand, for a right side that does not introduce a dt break, we must add the tables on the right to the dt where the left side was added. To this effect we set and check allowedInDt correctly also for filters.
77b66bb to
a86b10a
Compare
Summary: Extracted from facebookincubator#573 Differential Revision: D87333918
Summary: Extracted from facebookincubator#573 Co-authored-by: oerling Differential Revision: D87333918
Summary: Extracted from facebookincubator#573 Co-authored-by: oerling Differential Revision: D87333918
Summary: Fix cardinality estimate for aggregation. This fixes the plan for TPC-H q16. Extracted from facebookincubator#573 Co-authored-by: oerling Differential Revision: D87333918
Summary: Fix cardinality estimate and planning logic for aggregations. This fixes the plan for TPC-H q16. Aggregation is planned as follows: If 1 worker and 1 thread: all aggregations are planned as single aggregations. A global aggregation always has partial + final since combining values on a single thread is never better than combining these on multiple threads with guaranteed reduction to one row per thread. Otherwise we compare a plan with a partial aggregation to a plan with a single aggregation and pick the one with less cost. See Optimization::addAggregation(). The cost of group-by is in Aggregation::setCostWithGroups(). This has an auxiliary function maxGroups() which calculates the maximum number of possible combinations of grouping keys. We distinguish between the number of input rows and the number of distinct input rows the actual input is drawn from. We calculate the size of the input domain by grouping the keys by table. If a key depends on many tables, we assign the key to its largest table. If there is a single key per table in the grouping keys, we use its cardinality. This is always <= the cardinality of the table. If there are many keys from the table, we use a saturating product function to multiply the key cardinalities with a maximum of table cardinality for the count of distinct combinations. If the product is far from the max, this behaves like a multiplication. If the product is close to max, the function approaches max when the product goes to infinity. In this way, more keys produce more groups but will not overflow the table size. If we have many tables, we combine the table per-table cardinalities with a saturating product. Here we use the greater of 3*largest table size and an arbitrary 1e10, which is a proxy for a large group by (10 billion groups). Having determined the size of the pool from which the input is drawn, we use the coupon collector formula to see what reduction we expect. The size of the group-by is expectedNumDistincts(inputRowCount, maxGroups). expectedNumDistincts. This uses the coupon collector formula to estimate how many distinct values will occur in n tuples of input if there are k possible distinct tuples in the input. We apply this formula to get the total expected fanout: numDistinct / numInput. This is always <= 1. ## Cost of partial aggregation The partial capacity is the number of rows at the predicted width that fit in the memory budget for partial aggregation. We see how many rows of input it will take to get this many distinct groups in the input of partial aggregation. With few groups, this will be a large number and with every row being unique this will be the capacity of the partial aggregation. The partial capacity / number of inputs to hit capacity is the reduction from partial. This reduction can however not be greater (lesser fanout) than the total reduction. Given the reduction and given the expected reduction in the first abandon partial min rows rows, we see if partial will be abandoned at run time. We set the cost and cardinality accordingly. Abandoned partial has a fanout of 1 and a low cost per byte of input row. ## Cost of final / single aggregation If we are planning a final aggregation, we scale the input by the reduction expected from partial. This reduces the number of rows that will be shuffled into the final aggregation but does not reduce the size of the hash table. The final table will in the end contain all distinct groups from the input. We check if we have a local exchange. This is the case if we have more than one thread per worker.. If so, we add a fraction of the shuffle cost as the cost of local exchange. The cost has unitCost (per row cost) and total memory and fanout. The unit cost formula is the same for partial, final and single aggregations. The cost includes: - Compute the hash of the grouping keys := Costs::kHashColumnCost * numKeys - Lookup the hash in the hash table := Costs::hashTableCost(nOut) - Compare the keys := kKeyCompareCost * numKeys - Access the row of the hash table (twice) := 2 * Costs::hashRowCost(nOut, rowBytes) - Update accumulators := aggregates.size() * Costs::kSimpleAggregateCost The memory estimate is the estimated row size plus a typical overhead of 12. The table size is the number of distinct values at 12 bytes per value. The formula represents the added memory cost for larger tables. A more precise formula can be learned from examples. The intended function of these mechanisms is to decide between partial + final aggregation and a single level of aggregation. Additionally, the cost should be comparable to the costs predicted for other operations like joins and shuffles. Extracted from facebookincubator#573 Co-authored-by: oerling Reviewed By: xiaoxmeng Differential Revision: D87333918
Summary: Fix cardinality estimate and planning logic for aggregations. This fixes the plan for TPC-H q16. Aggregation is planned as follows: If 1 worker and 1 thread: all aggregations are planned as single aggregations. A global aggregation always has partial + final since combining values on a single thread is never better than combining these on multiple threads with guaranteed reduction to one row per thread. Otherwise we compare a plan with a partial aggregation to a plan with a single aggregation and pick the one with less cost. See Optimization::addAggregation(). For use cases where statistics are not available, we provide an option to disable cost-based decisions and always split aggregation into partial + final: alwaysPlanPartialAggregation. The cost of group-by is in Aggregation::setCostWithGroups(). This has an auxiliary function maxGroups() which calculates the maximum number of possible combinations of grouping keys. We distinguish between the number of input rows and the number of distinct input rows the actual input is drawn from. We calculate the size of the input domain by grouping the keys by table. If a key depends on many tables, we assign the key to its largest table. If there is a single key per table in the grouping keys, we use its cardinality. This is always <= the cardinality of the table. If there are many keys from the table, we use a saturating product function to multiply the key cardinalities with a maximum of table cardinality for the count of distinct combinations. If the product is far from the max, this behaves like a multiplication. If the product is close to max, the function approaches max when the product goes to infinity. In this way, more keys produce more groups but will not overflow the table size. If we have many tables, we combine the table per-table cardinalities with a saturating product. Here we use the greater of 3*largest table size and an arbitrary 1e10, which is a proxy for a large group by (10 billion groups). Having determined the size of the pool from which the input is drawn, we use the coupon collector formula to see what reduction we expect. The size of the group-by is expectedNumDistincts(inputRowCount, maxGroups). expectedNumDistincts. This uses the coupon collector formula to estimate how many distinct values will occur in n tuples of input if there are k possible distinct tuples in the input. We apply this formula to get the total expected fanout: numDistinct / numInput. This is always <= 1. ## Cost of partial aggregation The partial capacity is the number of rows at the predicted width that fit in the memory budget for partial aggregation. We see how many rows of input it will take to get this many distinct groups in the input of partial aggregation. With few groups, this will be a large number and with every row being unique this will be the capacity of the partial aggregation. The partial capacity / number of inputs to hit capacity is the reduction from partial. This reduction can however not be greater (lesser fanout) than the total reduction. Given the reduction and given the expected reduction in the first abandon partial min rows rows, we see if partial will be abandoned at run time. We set the cost and cardinality accordingly. Abandoned partial has a fanout of 1 and a low cost per byte of input row. ## Cost of final / single aggregation If we are planning a final aggregation, we scale the input by the reduction expected from partial. This reduces the number of rows that will be shuffled into the final aggregation but does not reduce the size of the hash table. The final table will in the end contain all distinct groups from the input. We check if we have a local exchange. This is the case if we have more than one thread per worker.. If so, we add a fraction of the shuffle cost as the cost of local exchange. The cost has unitCost (per row cost) and total memory and fanout. The unit cost formula is the same for partial, final and single aggregations. The cost includes: - Compute the hash of the grouping keys := Costs::kHashColumnCost * numKeys - Lookup the hash in the hash table := Costs::hashTableCost(nOut) - Compare the keys := kKeyCompareCost * numKeys - Access the row of the hash table (twice) := 2 * Costs::hashRowCost(nOut, rowBytes) - Update accumulators := aggregates.size() * Costs::kSimpleAggregateCost The memory estimate is the estimated row size plus a typical overhead of 12. The table size is the number of distinct values at 12 bytes per value. The formula represents the added memory cost for larger tables. A more precise formula can be learned from examples. The intended function of these mechanisms is to decide between partial + final aggregation and a single level of aggregation. Additionally, the cost should be comparable to the costs predicted for other operations like joins and shuffles. Extracted from facebookincubator#573 Co-authored-by: oerling Reviewed By: xiaoxmeng Differential Revision: D87333918
Summary: Fix cardinality estimate and planning logic for aggregations. This fixes the plan for TPC-H q16. Aggregation is planned as follows: If 1 worker and 1 thread: all aggregations are planned as single aggregations. A global aggregation always has partial + final since combining values on a single thread is never better than combining these on multiple threads with guaranteed reduction to one row per thread. Otherwise we compare a plan with a partial aggregation to a plan with a single aggregation and pick the one with less cost. See Optimization::addAggregation(). For use cases where statistics are not available, we provide an option to disable cost-based decisions and always split aggregation into partial + final: alwaysPlanPartialAggregation. The cost of group-by is in Aggregation::setCostWithGroups(). This has an auxiliary function maxGroups() which calculates the maximum number of possible combinations of grouping keys. We distinguish between the number of input rows and the number of distinct input rows the actual input is drawn from. We calculate the size of the input domain by grouping the keys by table. If a key depends on many tables, we assign the key to its largest table. If there is a single key per table in the grouping keys, we use its cardinality. This is always <= the cardinality of the table. If there are many keys from the table, we use a saturating product function to multiply the key cardinalities with a maximum of table cardinality for the count of distinct combinations. If the product is far from the max, this behaves like a multiplication. If the product is close to max, the function approaches max when the product goes to infinity. In this way, more keys produce more groups but will not overflow the table size. If we have many tables, we combine the table per-table cardinalities with a saturating product. Here we use the greater of 3*largest table size and an arbitrary 1e10, which is a proxy for a large group by (10 billion groups). Having determined the size of the pool from which the input is drawn, we use the coupon collector formula to see what reduction we expect. The size of the group-by is expectedNumDistincts(inputRowCount, maxGroups). expectedNumDistincts. This uses the coupon collector formula to estimate how many distinct values will occur in n tuples of input if there are k possible distinct tuples in the input. We apply this formula to get the total expected fanout: numDistinct / numInput. This is always <= 1. ## Cost of partial aggregation The partial capacity is the number of rows at the predicted width that fit in the memory budget for partial aggregation. We see how many rows of input it will take to get this many distinct groups in the input of partial aggregation. With few groups, this will be a large number and with every row being unique this will be the capacity of the partial aggregation. The partial capacity / number of inputs to hit capacity is the reduction from partial. This reduction can however not be greater (lesser fanout) than the total reduction. Given the reduction and given the expected reduction in the first abandon partial min rows rows, we see if partial will be abandoned at run time. We set the cost and cardinality accordingly. Abandoned partial has a fanout of 1 and a low cost per byte of input row. ## Cost of final / single aggregation If we are planning a final aggregation, we scale the input by the reduction expected from partial. This reduces the number of rows that will be shuffled into the final aggregation but does not reduce the size of the hash table. The final table will in the end contain all distinct groups from the input. We check if we have a local exchange. This is the case if we have more than one thread per worker.. If so, we add a fraction of the shuffle cost as the cost of local exchange. The cost has unitCost (per row cost) and total memory and fanout. The unit cost formula is the same for partial, final and single aggregations. The cost includes: - Compute the hash of the grouping keys := Costs::kHashColumnCost * numKeys - Lookup the hash in the hash table := Costs::hashTableCost(nOut) - Compare the keys := kKeyCompareCost * numKeys - Access the row of the hash table (twice) := 2 * Costs::hashRowCost(nOut, rowBytes) - Update accumulators := aggregates.size() * Costs::kSimpleAggregateCost The memory estimate is the estimated row size plus a typical overhead of 12. The table size is the number of distinct values at 12 bytes per value. The formula represents the added memory cost for larger tables. A more precise formula can be learned from examples. The intended function of these mechanisms is to decide between partial + final aggregation and a single level of aggregation. Additionally, the cost should be comparable to the costs predicted for other operations like joins and shuffles. Extracted from facebookincubator#573 Co-authored-by: oerling Reviewed By: xiaoxmeng Differential Revision: D87333918
Summary: Pull Request resolved: #639 Fix cardinality estimate and planning logic for aggregations. This fixes the plan for TPC-H q16. Aggregation is planned as follows: If 1 worker and 1 thread: all aggregations are planned as single aggregations. A global aggregation always has partial + final since combining values on a single thread is never better than combining these on multiple threads with guaranteed reduction to one row per thread. Otherwise we compare a plan with a partial aggregation to a plan with a single aggregation and pick the one with less cost. See Optimization::addAggregation(). For use cases where statistics are not available, we provide an option to disable cost-based decisions and always split aggregation into partial + final: alwaysPlanPartialAggregation. The cost of group-by is in Aggregation::setCostWithGroups(). This has an auxiliary function maxGroups() which calculates the maximum number of possible combinations of grouping keys. We distinguish between the number of input rows and the number of distinct input rows the actual input is drawn from. We calculate the size of the input domain by grouping the keys by table. If a key depends on many tables, we assign the key to its largest table. If there is a single key per table in the grouping keys, we use its cardinality. This is always <= the cardinality of the table. If there are many keys from the table, we use a saturating product function to multiply the key cardinalities with a maximum of table cardinality for the count of distinct combinations. If the product is far from the max, this behaves like a multiplication. If the product is close to max, the function approaches max when the product goes to infinity. In this way, more keys produce more groups but will not overflow the table size. If we have many tables, we combine the table per-table cardinalities with a saturating product. Here we use the greater of 3*largest table size and an arbitrary 1e10, which is a proxy for a large group by (10 billion groups). Having determined the size of the pool from which the input is drawn, we use the coupon collector formula to see what reduction we expect. The size of the group-by is expectedNumDistincts(inputRowCount, maxGroups). expectedNumDistincts. This uses the coupon collector formula to estimate how many distinct values will occur in n tuples of input if there are k possible distinct tuples in the input. We apply this formula to get the total expected fanout: numDistinct / numInput. This is always <= 1. ## Cost of partial aggregation The partial capacity is the number of rows at the predicted width that fit in the memory budget for partial aggregation. We see how many rows of input it will take to get this many distinct groups in the input of partial aggregation. With few groups, this will be a large number and with every row being unique this will be the capacity of the partial aggregation. The partial capacity / number of inputs to hit capacity is the reduction from partial. This reduction can however not be greater (lesser fanout) than the total reduction. Given the reduction and given the expected reduction in the first abandon partial min rows rows, we see if partial will be abandoned at run time. We set the cost and cardinality accordingly. Abandoned partial has a fanout of 1 and a low cost per byte of input row. ## Cost of final / single aggregation If we are planning a final aggregation, we scale the input by the reduction expected from partial. This reduces the number of rows that will be shuffled into the final aggregation but does not reduce the size of the hash table. The final table will in the end contain all distinct groups from the input. We check if we have a local exchange. This is the case if we have more than one thread per worker.. If so, we add a fraction of the shuffle cost as the cost of local exchange. The cost has unitCost (per row cost) and total memory and fanout. The unit cost formula is the same for partial, final and single aggregations. The cost includes: - Compute the hash of the grouping keys := Costs::kHashColumnCost * numKeys - Lookup the hash in the hash table := Costs::hashTableCost(nOut) - Compare the keys := kKeyCompareCost * numKeys - Access the row of the hash table (twice) := 2 * Costs::hashRowCost(nOut, rowBytes) - Update accumulators := aggregates.size() * Costs::kSimpleAggregateCost The memory estimate is the estimated row size plus a typical overhead of 12. The table size is the number of distinct values at 12 bytes per value. The formula represents the added memory cost for larger tables. A more precise formula can be learned from examples. The intended function of these mechanisms is to decide between partial + final aggregation and a single level of aggregation. Additionally, the cost should be comparable to the costs predicted for other operations like joins and shuffles. Extracted from #573 Reviewed By: xiaoxmeng Differential Revision: D87333918 fbshipit-source-id: 23ffab1ace0ff3332491d33d847fb4dec7cbe428 Co-authored-by: oerling
ToGraph builds derived tables bottom up. On the returning edge of recursion we add joins or dt postprocessing steps like group by or limit. When there are things that do not fit the implied processing order of a dt, i.e. joins, group by, having, orderby, limit/ofset, we wrap the plan so far in another dt. For example,
scan, limit, filter, limit
would have (scan, limit) in a dt and the filter and second limit in a dt containing the first one. Now, when a dt as above is to the right of a join, we must start the dt to the right from scratch, meaning that the tables in the dt must be empty and not contain tables from the left side. On the other hand, for a right side that does not introduce a dt break, we must add the tables on the right to the dt where the left side was added.
To this effect we set and check allowedInDt correctly also for filters.